In this Notebook, we show how to run a Spark Streaming application using a Notebook. There are multiple limitations to be aware of:
The code can be found here: https://github.com/ibm-cds-labs/spark.samples/tree/master/streaming-twitter. The following code is using a pre-built jar that has been posted on the Github project, but you can replace with your own url if needed.
In [ ]:
%AddJar https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.5.jar -f
In [ ]:
val demo = com.ibm.cds.spark.samples.MessageHubStreamingTwitter
val config = demo.getConfig()
//Watson Tone Analyzer service
config.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-beta/api")
config.setConfig("watson.tone.password","XXXX")
config.setConfig("watson.tone.username","XXXX")
//Message Hub/Kafka service
config.setConfig("bootstrap.servers","kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094")
config.setConfig("api_key","XXXX")
config.setConfig("kafka.topic.tweet","twitter-spark")
config.setConfig("kafka.user.name","XXXX")
config.setConfig("kafka.user.password","XXXX")
config.setConfig("kafka_rest_url","https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")
//Spark Streaming checkpointing configuration with Object Storage Swift container
config.setConfig("name","spark");
config.setConfig("auth_url","https://identity.open.softlayer.com");
config.setConfig("project_id","XXXX");
config.setConfig("region","dallas");
config.setConfig("user_id","XXXX");
config.setConfig("password","XXXX");
config.setConfig("checkpointDir", "swift://notebooks.spark/ssc")
Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the startTwitterStreaming method. Note: if no duration is specified then the stream will run until the stopTwitterStreaming method is called.
In [ ]:
demo.startTwitterStreaming(sc)
In [ ]:
demo.stopTwitterStreaming